process one file


In [1]:
from glob import glob
from itertools import (islice, zip_longest)
from collections import Counter

import numpy as np

import raw_taq

In [2]:
# You can run this if you update the raw_taq.py file
from importlib import reload
reload(raw_taq)


Out[2]:
<module 'raw_taq' from '/home/rdhyee/dlab-finance/basic-taq/raw_taq.py'>

In [3]:
fname = "../local_data/EQY_US_ALL_BBO_20150102.zip"
taq_file = raw_taq.TAQ2Chunks(fname)

In [4]:
# code to walk through a zip file

# function to calculate mapping of line len to index of last column

def record_len_to_last_column(initial_dtype):
    """
    initial_dtype of form:
    
    [('Time', 'S9'),
 ('Exchange', 'S1'),
 ....
 ('newline', 'S2')]
 
 Assumption is that the last field is a newline field that is present in all versions of BBO
    """
    
    cum_len = 0
    cum_lens = []
    flens = [(field, int(dtype[1:])) for (field, dtype) in raw_taq.initial_dtype]
    newline_len = flens[-1][1]

    for (i,(field, flen)) in enumerate(flens[:-1]):
        cum_len += flen
        cum_lens.append((cum_len+newline_len, i))

    return dict(cum_lens)
    
    

def raw_chunks_from_zipfile(fname, chunksize=1000):
    import zipfile
    import datetime

    with zipfile.ZipFile(fname, 'r') as zfile:
        for inside_f in zfile.filelist:
           
            # can I do two passes -- first pass is to read 2 first two lines 
            
            with zfile.open(inside_f.filename) as infile:
                first = infile.readline()  # we can process first line
                second = infile.readline()
                bytes_per_line = len(second)
        
            with zfile.open(inside_f.filename) as infile:
                first = infile.readline()
                
                still_bytes = True
                while (still_bytes):
                    raw_bytes = infile.read(bytes_per_line * chunksize)
                    if raw_bytes:
                        yield (raw_bytes)
                    else:
                        still_bytes = False

RECORD_LEN_TO_LAST_COLUMN_MAP = record_len_to_last_column(raw_taq.initial_dtype)                


def chunks_from_zipfile(fname, chunksize=1000):
    import zipfile
    import datetime
    
    
    with zipfile.ZipFile(fname, 'r') as zfile:
        for inside_f in zfile.filelist:
                   
            with zfile.open(inside_f.filename) as infile:
                first = infile.readline()
                bytes_per_line = len(first)
                
                dtype = raw_taq.initial_dtype[:RECORD_LEN_TO_LAST_COLUMN_MAP[bytes_per_line]+1] + \
                   [raw_taq.initial_dtype[-1]]
                    
                more_bytes = True
                
                while (more_bytes):
                    raw_bytes = infile.read(bytes_per_line * chunksize)
                    all_strings = np.ndarray(len(raw_bytes) // bytes_per_line, 
                                             buffer=raw_bytes, dtype=dtype)
                    
                    if raw_bytes:
                        yield (all_strings)
                    else:
                        more_bytes = False

In [5]:
def walk_through_zip_raw(fname,chunksize=100000,max_chunk=None):
    for (i, chunk) in enumerate(islice(raw_chunks_from_zipfile(fname, chunksize=chunksize),max_chunk)):
        pass
    return i

def walk_through_zip_init_conv(fname,chunksize=100000,max_chunk=None):
    LINE_WIDTH = 98 # will have to generalize to get line size out
    expected_buffer_size = chunksize *  LINE_WIDTH
    
    for (i, chunk) in enumerate(islice(raw_chunks_from_zipfile(fname, chunksize=chunksize),max_chunk)):
        try:
            all_strings = np.ndarray(chunksize, buffer=chunk, dtype=raw_taq.initial_dtype)
        except Exception as e:
            all_strings = np.ndarray(len(chunk) // LINE_WIDTH, buffer=chunk, dtype=raw_taq.initial_dtype)
            
    return i
            
    
def walk_through_zip_init_conv_0(fname,chunksize=100000,max_chunk=None):
    
    for (i, chunk) in enumerate(islice(raw_chunks_from_zipfile(fname, chunksize=chunksize),max_chunk)):
        all_strings = np.ndarray(chunksize, buffer=chunk, dtype=raw_taq.initial_dtype)
            
    return i

In [ ]:
# converting im
import datetime
datetime.datetime.fromtimestamp(1420230800.94)

In [8]:
# Accumulate (exchange, symbol_root, symbol_suffix)


def count_chunk_elements(fname, chunksize=1000000, max_chunk=None):

    symbol_roots = Counter()

    for (i,chunk) in enumerate(islice(chunks_from_zipfile(fname, chunksize), max_chunk)):

        counts = np.unique(chunk[:]['Symbol_root'], return_counts=True)
        symbol_roots.update(dict(zip_longest(counts[0], counts[1])))

        print("\r {0}".format(i),end="")

    return symbol_roots

In [32]:
#fame = "../local_data/EQY_US_ALL_BBO_20150102.zip"
fname = "../local_data/EQY_US_ALL_BBO_20100104.zip"

In [33]:
%time c = count_chunk_elements(fname, max_chunk=None)


 325CPU times: user 1min 44s, sys: 13.8 s, total: 1min 57s
Wall time: 1min 57s

In [34]:
sum(c.values())


Out[34]:
325268422

In [31]:
for (i,(k,v)) in enumerate(islice(c.most_common(),10)):
    print ("\t".join([str(i), k.decode('utf-8').strip(), str(v)]))


0	SPY	10234301
1	QQQ	6371754
2	VXX	6180076
3	IWM	4076960
4	XLE	3995591
5	XIV	3337509
6	DIA	3216624
7	QLD	3071839
8	AAPL	2878822
9	UPRO	2744896

try new TAQ2Chunks


In [16]:
from glob import glob
from itertools import (islice, zip_longest)
from collections import Counter

import numpy as np

import raw_taq

In [23]:
import raw_taq
#fname = "../local_data/EQY_US_ALL_BBO_20150102.zip"
fname = "../local_data/EQY_US_ALL_BBO_20100104.zip"
chunks = raw_taq.TAQ2Chunks(fname,chunksize=1, process_chunk=False)

In [24]:
from collections import Counter
from itertools import islice
import numpy as np

def count_chunk_elements1(fname, chunksize=1000000, max_chunk=None, process_chunk=False):

    symbol_roots = Counter()

    for (i,chunk) in enumerate(islice(raw_taq.TAQ2Chunks(fname, 
                                                         chunksize=chunksize, 
                                                         process_chunk=process_chunk), max_chunk)):

        counts = np.unique(chunk[:]['Symbol_root'], return_counts=True)
        symbol_roots.update(dict(zip_longest(counts[0], counts[1])))

        print("\r {0}".format(i),end="")

    return symbol_roots

In [25]:
%time c = count_chunk_elements1(fname, max_chunk=None)


 325CPU times: user 1min 45s, sys: 14.8 s, total: 2min
Wall time: 2min 12s

In [26]:
sum(c.values())


Out[26]:
325268422

In [27]:
for (i,(k,v)) in enumerate(islice(c.most_common(),10)):
    print ("\t".join([str(i), k.decode('utf-8').strip(), str(v)]))


0	PEF	3693841
1	UPRO	3096180
2	USO	1742436
3	C	1642334
4	QQQQ	1542761
5	BAC	1504047
6	GLD	1499900
7	IWM	1448921
8	SPY	1392723
9	GDX	1332082

In [ ]: